kafka消费能力优化|Java 开发实战 您所在的位置:网站首页 kafka java消费者 kafka消费能力优化|Java 开发实战

kafka消费能力优化|Java 开发实战

2022-12-14 11:02| 来源: 网络整理| 查看: 265

本文正在参加「Java主题月 - Java 开发实战」,详情查看 活动链接

这是我参与更文挑战的第1天,活动详情查看: 更文挑战

  Spring项目里引入Kafka非常方便,使用kafkaTemplate(Producer的模版)+@KafkaListener(Consumer的监听器)即可完成生产者-消费者的代码开发,相信这些,用过的同学都很清楚了,这里我不对spring-Kafka做过多讲解。我们今天主要来探讨一下如何提升kafka的消费能力。

1.简单的消费者 1.1 配置consumerFactory

首先要配置consumer的属性

@Bean(BeanNameConstant.CONSUMER_FACTORY) public ConsumerFactory consumerFactory() { final StringDeserializer stringDeserializer = new StringDeserializer(); Map props = new HashMap(10); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); //设置是否自动提交offset 2.3 版本以后默认为false props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 300000); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); ConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory(props, stringDeserializer, stringDeserializer); return consumerFactory; } 复制代码 1.2 配置KafkaListenerContainerFactory

关于consumer的主要的封装在ConcurrentKafkaListenerContainerFactory这个里头,本身的KafkaConsumer是线程不安全的,无法并发操作,这里spring又在包装了一层,根据配置的spring.kafka.listener.concurrency来生成多个并发的KafkaMessageListenerContainer实例

@Bean(BeanNameConstant.KAFKA_LISTENER_CONTAINER_FACTORY) public KafkaListenerContainerFactory kafkaListenerContainerFactory (@Qualifier(BeanNameConstant.CONSUMER_FACTORY) ConsumerFactory consumerFactory) { //构建kafka并行消费监听类工厂类 此类通过topic名称创建该topic消费监听 ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory(); //可通过注解的方式进行设置 concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory); //手动ack concurrentKafkaListenerContainerFactory.getContainerProperties().setAckOnError(false); //设置ack模型机制 当发生error时 不同处理机制针对与offset有不同处理机制 concurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return concurrentKafkaListenerContainerFactory; } 复制代码 1.3 消费者 @KafkaListener( topics = "${kafka-topic.demo}", containerFactory = BeanNameConstant.KAFKA_LISTENER_CONTAINER_FACTORY, concurrency = "1" ) public void loadListener(ConsumerRecord record, Acknowledgment ack){ try{ //业务方法 dealMessage(JsonUtil.readValue(String.valueOf(record.value()),Demo.class)); }catch (Exception e){ log.error("消费失败"); }finally { //手动提交ack ack.acknowledge(); } } 复制代码

ConsumerRecord类里面包含分区信息、消息头、消息体等内容,如果业务需要获取这些参数时,使用ConsumerRecord会是个不错的选择。如果使用具体的类型接收消息体则更加方便,比如说用String类型去接收消息体。通常我们会使用ConsumerRecord进行消费。

这里说下,如果ack未提交,consumer重启,consumer在rebalance后会从partition中重新拉去上一次的offset。可能会存在重复消费的情况。

2.多个消费者消费

  上面可以看到在@KafkaListener有一个属性concurrency 它是ConcurrentKafkaListenerContainerFactory的成员变量,我们在可以在配置KafkaListenerContainerFactory设置,也可以在每一个KafkaListener中对该配置进行覆盖。

/** * Specify the container concurrency. * @param concurrency the number of consumers to create. * @see ConcurrentMessageListenerContainer#setConcurrency(int) */ public void setConcurrency(Integer concurrency) { this.concurrency = concurrency; } 复制代码

它的作用是创建n个KafkaMessageListenerContainer实例,也就是n个kafkaconumser。是实现多个消费者消费的关键。

@KafkaListener( topics = "${kafka-topic.demo}", containerFactory = BeanNameConstant.KAFKA_LISTENER_CONTAINER_FACTORY, concurrency = "12" ) 复制代码

所以我们只要更改concurrency的数量就可以实现多线程消费了?这么简单?答案是否定的。

2.1 concurrency 如何设置

  concurrency的设置,取决于kafka的分区数据,也就是partition的数据量。因为一个KafkaMessageListenerContainer只会对一个分区进行消费。

  如果你的topic分区数只有8,那么你的concurrency最多只有8个可以正常工作。注意,如果是分布式系统,这里还要*节点数量。 如果你有两个节点,那么每个节点的concurrency应该设置为4

3.多线程批量消费

  通过对concurrency的设置,我们确实实现了多线程的消费,速度比之前快了。如果想在快点,我们不能无休止的增加partition的数量吧。partition扩上去可就缩不下来了。所以要另辟蹊径。

  官方提供的批量接口是这样的。也就是我们之前是用ConsumerRecord类接收消息,现在换成List就可以了。

public interface BatchMessageListener { void onMessage(List data); } public interface BatchAcknowledgingMessageListener { void onMessage(List data, Acknowledgment acknowledgment); } 复制代码

在此之前我们需要调整下consumer的配置

3.1 配置调整 3.1.1 配置consumerFactory //最大拉取条数2000 最大拉取时间1200s props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,10000); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,1200000); 复制代码

max.poll.records 控制每次拉取的条数。

max.poll.interval.ms 每次poll的最大时间间隔。

max.poll.interval.ms 这个参数对于批量消费很重要。如果设置过短,消费者在未完成业务处理提交offset时,会再次poll一批数据,触发conusmer的rebalance。导致此前已经在消费的消息,分配给其他消费者再消费一次。然后走进死循环。所有消费者都一直在消费这段offset的数据。造成数据挤压和重复消费。 目前就本人实践而言,设置一个特别大的值没有什么影响。这里不是说过了max.poll.interval.ms的时间才去poll数据,还是说如果超过了这个时间都没poll,consumer回去触发一次。

3.1.2 配置KafkaListenerContainerFactory //构建kafka并行消费监听类工厂类 此类通过topic名称创建该topic消费监听 ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory(); …… //是否并发消费 concurrentKafkaListenerContainerFactory.setBatchListener(true); 复制代码 3.2 代码实现 // 每个线程处理的最大数量 private static final int MAX_NUM = 100; @KafkaListener(groupId = "${spring.kafka.consumer.group-id}", topics = "${kafka-topic}",concurrency="3") public void loadListener(List> record) { for (ConsumerRecord consumerRecord : record) { //业务逻辑 this.dealMessage(consumerRecord); } } 复制代码

我的代码思路是,批量把消息拉取下来,多线程消费。 如果比较关心数据安全性和准确性,我们可以等这批数据全部处理完在提交offset。如果不是很在意,也可以扔到异步线程池里慢慢处理,offset直接提交。

public void loadListener(List consumerRecord : record) { executorService.execute(()->{ dealMessage(consumerRecord); }); } log.info("批量处理完成,处理数量={},耗时={}ms", record.size(), System.currentTimeMillis() - startTime); //手动确定 提交offset ack.acknowledge(); } 复制代码

无论是哪种,性能提升都会很明显。但是要注意每次拉取的数量和线程的设置要根据实际情况。测试时,注意cpu/内存 以及 DB 的监控情况及时进行调整。不要盲目追求数据处理能力,把别的业务都搞死了。性能优化的前提,一定是要保证系统的稳定性。 祝你好运~



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有